package cn.zhaopin.starter.mq.config;

import cn.zhaopin.starter.mq.annotation.PulsarMessageListener;
import cn.zhaopin.starter.mq.constant.MQConstant;
import cn.zhaopin.starter.mq.core.PulsarConsumerFactory;
import cn.zhaopin.starter.mq.core.PulsarListener;
import cn.zhaopin.starter.mq.properties.MQProperties;
import org.apache.pulsar.client.api.PulsarClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.aop.scope.ScopedProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.support.BeanDefinitionValidationException;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.GenericApplicationContext;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.messaging.converter.MessageConverter;
import org.springframework.util.StringUtils;

import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

/**
 * Description: 消费监听容器配置
 *
 * @author: zuomin (myleszelic@outlook.com)
 * @date: 2021/07/20-10:39
 */
@SuppressWarnings("SpringJavaInjectionPointsAutowiringInspection")
@Configuration
@ConditionalOnProperty(prefix = MQConstant.PREFIX, value = "type", havingValue = MQConstant.MqType.PULSAR)
public class ListenerContainerConfiguration implements ApplicationContextAware, SmartInitializingSingleton {

    private static Logger log = LoggerFactory.getLogger(ListenerContainerConfiguration.class);

    private ConfigurableApplicationContext applicationContext;

    private final StandardEnvironment environment;

    private final MQProperties mqProperties;

    private final AtomicLong counter = new AtomicLong(0);

    private final PulsarClient client;

    private MessageConverter messageConverter;

    public ListenerContainerConfiguration(StandardEnvironment environment, MQProperties mqProperties, PulsarClient client) {
        this.environment = environment;
        this.mqProperties = mqProperties;
        this.client = client;
    }

    @Override
    public void afterSingletonsInstantiated() {
        Map<String, Object> beans = this.applicationContext.getBeansWithAnnotation(PulsarMessageListener.class)
                .entrySet().stream().filter(entry -> !ScopedProxyUtils.isScopedTarget(entry.getKey()))
                .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

        beans.forEach(this::registerContainer);

    }

    private void registerContainer(String beanName, Object bean) {
        Class<?> clazz = AopProxyUtils.ultimateTargetClass(bean);
        if (!PulsarListener.class.isAssignableFrom(bean.getClass())) {
            throw new IllegalStateException(clazz + " cannot be instance of " + PulsarListener.class.getName());
        }
        PulsarMessageListener annotation = clazz.getAnnotation(PulsarMessageListener.class);

        validate(annotation);

        String containerBeanName = String.format("%s_%s", DefaultPulsarListenerContainer.class.getName(), counter.incrementAndGet());
        GenericApplicationContext genericApplicationContext = (GenericApplicationContext) applicationContext;

        genericApplicationContext.registerBean(containerBeanName, DefaultPulsarListenerContainer.class,
                () -> createPulsarListenerContainer(containerBeanName, bean, annotation));

        DefaultPulsarListenerContainer container = genericApplicationContext.getBean(containerBeanName,
                DefaultPulsarListenerContainer.class);
        if (!container.isRunning()) {
            try {
                container.start();
            } catch (Exception e) {
                log.error("Started container failed. {}", container, e);
                throw new RuntimeException(e);
            }
        }
        log.info("Register the listener to container, listenerBeanName:{}, containerBeanName:{}", beanName, containerBeanName);
    }

    private DefaultPulsarListenerContainer createPulsarListenerContainer(String containerBeanName, Object bean, PulsarMessageListener annotation) {

        DefaultPulsarListenerContainer listenerContainer = new DefaultPulsarListenerContainer();
        listenerContainer.setName(containerBeanName);
        listenerContainer.setClient(client);
        listenerContainer.setMessageListener(annotation);
        listenerContainer.setPulsarListener((PulsarListener) bean);
        listenerContainer.setConsumerFactory(new PulsarConsumerFactory(client, environment, mqProperties.getPulsar()));
        return listenerContainer;
    }

    private void validate(PulsarMessageListener annotation) {
        if(!StringUtils.hasLength(annotation.topic()) || !StringUtils.hasLength(annotation.subscriptionName())) {
            throw new BeanDefinitionValidationException(
                    "Bad annotation definition in @PulsarMessageListener, topic or subscriptionName cannot be null");
        }
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = (ConfigurableApplicationContext) applicationContext;
    }
}
